package org.codehaus.activemq;

import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.management.j2ee.statistics.Stats;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.broker.BrokerContainerFactory;
import org.codehaus.activemq.broker.impl.BrokerClientImpl;
import org.codehaus.activemq.broker.impl.BrokerConnectorImpl;
import org.codehaus.activemq.broker.impl.BrokerContainerFactoryImpl;
import org.codehaus.activemq.jndi.JNDIBaseStorable;
import org.codehaus.activemq.management.JMSStatsImpl;
import org.codehaus.activemq.management.StatsCapable;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQTopic;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DefaultWireFormat;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportChannelFactory;
import org.codehaus.activemq.transport.TransportChannelListener;
import org.codehaus.activemq.transport.TransportChannelProvider;
import org.codehaus.activemq.transport.vm.VmTransportChannel;
import org.codehaus.activemq.util.IdGenerator;

public class ActiveMQConnectionFactory extends JNDIBaseStorable
  implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, Service, StatsCapable
{
  private static final Log log = LogFactory.getLog(ActiveMQConnectionFactory.class);
  protected static BrokerContainer container;
  private static Map brokers = new HashMap();
  protected String userName;
  protected String password;
  protected String brokerURL;
  protected String clientID;
  private boolean useEmbeddedBroker;
  private List startedEmbeddedBrokers = new ArrayList();

  private JMSStatsImpl stats = new JMSStatsImpl();
  private WireFormat wireFormat = new DefaultWireFormat();
  private IdGenerator idGenerator = new IdGenerator();
  private int connectionCount;
  private BrokerContainerFactory brokerContainerFactory;

  public ActiveMQConnectionFactory()
  {
    this.userName = "defaultUser";
    this.password = "defaultPassword";
    this.brokerURL = "tcp://localhost:61616";
  }

  public ActiveMQConnectionFactory(String brokerURL) {
    this();
    this.brokerURL = brokerURL;
  }

  public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
    this.userName = userName;
    this.password = password;
    this.brokerURL = brokerURL;
  }

  public Stats getStats() {
    return this.stats;
  }

  public JMSStatsImpl getFactoryStats() {
    return this.stats;
  }

  public String getBrokerURL()
  {
    return this.brokerURL;
  }

  public void setBrokerURL(String brokerURL)
  {
    this.brokerURL = brokerURL;
  }

  public String getClientID()
  {
    return this.clientID;
  }

  public void setClientID(String clientID)
  {
    this.clientID = clientID;
  }

  public String getPassword()
  {
    return this.password;
  }

  public void setPassword(String password)
  {
    this.password = password;
  }

  public String getUserName()
  {
    return this.userName;
  }

  public void setUserName(String userName)
  {
    this.userName = userName;
  }

  public boolean isUseEmbeddedBroker()
  {
    return this.useEmbeddedBroker;
  }

  public void setUseEmbeddedBroker(boolean useEmbeddedBroker)
  {
    this.useEmbeddedBroker = useEmbeddedBroker;
  }

  public WireFormat getWireFormat() {
    return this.wireFormat;
  }

  public void setWireFormat(WireFormat wireFormat)
  {
    this.wireFormat = wireFormat;
  }

  public BrokerContainerFactory getBrokerContainerFactory() {
    if (this.brokerContainerFactory == null) {
      this.brokerContainerFactory = new BrokerContainerFactoryImpl();
    }
    return this.brokerContainerFactory;
  }

  public void setBrokerContainerFactory(BrokerContainerFactory brokerContainerFactory) {
    this.brokerContainerFactory = brokerContainerFactory;
  }

  protected void buildFromProperties(Properties props)
  {
    this.userName = props.getProperty("userName", this.userName);
    this.password = props.getProperty("password", this.password);
    this.brokerURL = props.getProperty("brokerURL", this.brokerURL);
    this.clientID = props.getProperty("clientID");
    this.useEmbeddedBroker = getBoolean(props, "useEmbeddedBroker");
  }

  protected void populateProperties(Properties props)
  {
    props.put("userName", this.userName);
    props.put("password", this.password);
    props.put("brokerURL", this.brokerURL);
    if (this.clientID != null) {
      props.put("clientID", this.clientID);
    }
    props.put("useEmbeddedBroker", this.useEmbeddedBroker ? "true" : "false");
  }

  protected boolean getBoolean(Properties props, String key)
  {
    String value = props.getProperty(key);
    return (value != null) && (value.equalsIgnoreCase("true"));
  }

  public Connection createConnection()
    throws JMSException
  {
    return createConnection(this.userName, this.password);
  }

  public Connection createConnection(String userName, String password)
    throws JMSException
  {
    ActiveMQConnection connection = new ActiveMQConnection(this, userName, password, createTransportChannel(this.brokerURL));
    if ((this.clientID != null) && (this.clientID.length() > 0)) {
      connection.setClientID(this.clientID);
    }
    return connection;
  }

  public QueueConnection createQueueConnection()
    throws JMSException
  {
    return createQueueConnection(this.userName, this.password);
  }

  public QueueConnection createQueueConnection(String userName, String password)
    throws JMSException
  {
    return (QueueConnection)createConnection(userName, password);
  }

  public TopicConnection createTopicConnection()
    throws JMSException
  {
    return createTopicConnection(this.userName, this.password);
  }

  public TopicConnection createTopicConnection(String userName, String password)
    throws JMSException
  {
    return (TopicConnection)createConnection(userName, password);
  }

  public void start()
    throws JMSException
  {
  }

  public synchronized void stop()
    throws JMSException
  {
    for (Iterator iter = this.startedEmbeddedBrokers.iterator(); iter.hasNext(); ) {
      String uri = (String)iter.next();
      unregisterBroker(uri);
    }
    if (container != null) {
      container.stop();
      container = null;
    }
  }

  protected TransportChannel createTransportChannel(String theURLString)
    throws JMSException
  {
    URI uri = createURI(theURLString);

    TransportChannelFactory factory = TransportChannelProvider.getFactory(uri);

    BrokerConnector brokerConnector = null;
    boolean created = false;
    boolean embedServer = (isUseEmbeddedBroker()) || (factory.requiresEmbeddedBroker());
    if (embedServer) {
      synchronized (this) {
        brokerConnector = (BrokerConnector)brokers.get(theURLString);
        if (brokerConnector == null) {
          brokerConnector = createBrokerConnector(theURLString);
          registerBroker(theURLString, brokerConnector);
          this.startedEmbeddedBrokers.add(theURLString);
          created = true;
        }
      }
    }
    TransportChannel transportChannel = factory.create(getWireFormat(), uri);
    if (embedServer) {
      return ensureServerIsAvailable(uri, transportChannel, brokerConnector, created);
    }
    return transportChannel;
  }

  public static synchronized void registerBroker(String theURLString, BrokerConnector brokerConnector) {
    brokers.put(theURLString, brokerConnector);
  }

  public static synchronized void unregisterBroker(String theURLString) {
    brokers.remove(theURLString);
  }

  protected synchronized BrokerContainer getContainer(String brokerName) throws JMSException {
    if (container == null) {
      container = getBrokerContainerFactory().createBrokerContainer(brokerName);
      container.start();
    }
    return container;
  }

  protected BrokerConnector createBrokerConnector(String url) throws JMSException
  {
    BrokerConnector brokerConnector = new BrokerConnectorImpl(getContainer(url), url, getWireFormat());
    brokerConnector.start();

    log.info("Embedded JMS Broker has started");
    try {
      Thread.sleep(1000L);
    }
    catch (InterruptedException e) {
      System.out.println("Caught: " + e);
      e.printStackTrace();
    }
    return brokerConnector;
  }

  protected TransportChannel ensureServerIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException
  {
    ensureVmServerIsAvailable(channel, brokerConnector);
    if (channel.isMulticast()) {
      return ensureMulticastChannelIsAvailable(remoteLocation, channel, brokerConnector, created);
    }
    return channel;
  }

  private void ensureVmServerIsAvailable(TransportChannel channel, BrokerConnector brokerConnector) throws JMSException {
    if (((channel instanceof VmTransportChannel)) && ((brokerConnector instanceof TransportChannelListener))) {
      TransportChannelListener listener = (TransportChannelListener)brokerConnector;
      VmTransportChannel answer = (VmTransportChannel)channel;

      listener.addClient(answer.createServerSide());
    }
  }

  protected TransportChannel ensureMulticastChannelIsAvailable(URI remoteLocation, TransportChannel channel, BrokerConnector brokerConnector, boolean created) throws JMSException {
    if (created) {
      BrokerConnectorImpl brokerImpl = (BrokerConnectorImpl)brokerConnector;

      BrokerClientImpl client = new BrokerClientImpl();
      client.initialize(brokerImpl, channel);
      channel.start();
      String brokerClientID = createMulticastClientID();
      channel.setClientID(brokerClientID);

      ConnectionInfo info = new ConnectionInfo();
      info.setHostName(IdGenerator.getHostName());
      info.setClientId(brokerClientID);
      info.setStarted(true);
      client.consumeConnectionInfo(info);

      ConsumerInfo consumerInfo = new ConsumerInfo();
      consumerInfo.setDestination(new ActiveMQTopic(">"));
      consumerInfo.setNoLocal(true);
      consumerInfo.setClientId(brokerClientID);
      consumerInfo.setConsumerId(this.idGenerator.generateId());
      consumerInfo.setId(consumerInfo.getConsumerId());
      consumerInfo.setStarted(true);
      client.consumeConsumerInfo(consumerInfo);

      consumerInfo = new ConsumerInfo();
      consumerInfo.setDestination(new ActiveMQQueue(">"));
      consumerInfo.setNoLocal(true);
      consumerInfo.setClientId(brokerClientID);
      consumerInfo.setConsumerId(this.idGenerator.generateId());
      consumerInfo.setId(consumerInfo.getConsumerId());
      consumerInfo.setStarted(true);
      client.consumeConsumerInfo(consumerInfo);
    }

    URI localURI = createURI("vm", remoteLocation);
    TransportChannel localChannel = TransportChannelProvider.create(getWireFormat(), localURI);
    ensureVmServerIsAvailable(localChannel, brokerConnector);
    return localChannel;
  }

  protected String createMulticastClientID()
  {
    return this.idGenerator.generateId();
  }
  protected URI createURI(String protocol, URI uri) throws JMSException {
    JMSException jmsEx;
    try { return new URI(protocol, uri.getRawSchemeSpecificPart(), uri.getFragment());
    } catch (URISyntaxException e)
    {
      jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
      jmsEx.setLinkedException(e);
    }throw jmsEx;
  }

  protected URI createURI(String uri) throws JMSException {
    JMSException jmsEx;
    try {
      if (uri == null) {
        throw new JMSException("The connection URI must be specified!");
      }
      return new URI(uri);
    }
    catch (URISyntaxException e) {
      jmsEx = new JMSException("the URL string is badly formated:", e.getMessage());
      jmsEx.setLinkedException(e);
    }throw jmsEx;
  }

  synchronized void onConnectionClose(ActiveMQConnection connection)
    throws JMSException
  {
    if (--this.connectionCount <= 0)
    {
      stop();
    }
  }

  synchronized void onConnectionCreate(ActiveMQConnection connection)
  {
    this.connectionCount += 1;
  }

@Override
public JMSContext createContext() {
	// TODO Auto-generated method stub
	return null;
}

@Override
public JMSContext createContext(int arg0) {
	// TODO Auto-generated method stub
	return null;
}

@Override
public JMSContext createContext(String arg0, String arg1) {
	// TODO Auto-generated method stub
	return null;
}

@Override
public JMSContext createContext(String arg0, String arg1, int arg2) {
	// TODO Auto-generated method stub
	return null;
}
}